diff --git a/swh/web/ui/apidoc.py b/swh/web/ui/apidoc.py
index 18e2dd28b..13342a5f4 100644
--- a/swh/web/ui/apidoc.py
+++ b/swh/web/ui/apidoc.py
@@ -1,418 +1,440 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from functools import wraps
from enum import Enum
from flask import request, render_template, url_for
from flask import g
from swh.web.ui.main import app
class argtypes(Enum): # noqa: N801
"""Class for centralizing argument type descriptions
"""
ts = 'timestamp'
int = 'integer'
str = 'string'
path = 'path'
sha1 = 'sha1'
uuid = 'uuid'
sha1_git = 'sha1_git'
algo_and_hash = 'algo_hash:hash'
class rettypes(Enum): # noqa: N801
"""Class for centralizing return type descriptions
"""
octet_stream = 'octet stream'
list = 'list'
dict = 'dict'
class excs(Enum): # noqa: N801
"""Class for centralizing exception type descriptions
"""
badinput = 'BadInputExc'
notfound = 'NotFoundExc'
class APIUrls(object):
"""
Class to manage API documentation URLs.
* Indexes all routes documented using apidoc's decorators.
* Tracks endpoint/request processing method relationships for use
in generating related urls in API documentation
Relies on the load_controllers logic in main.py for initialization.
"""
apidoc_routes = {}
method_endpoints = {}
@classmethod
def get_app_endpoints(cls):
return cls.apidoc_routes
@classmethod
def get_method_endpoints(cls, fname):
if len(cls.method_endpoints) == 0:
cls.method_endpoints = cls.group_routes_by_method()
return cls.method_endpoints[fname]
@classmethod
def group_routes_by_method(cls):
"""
Group URL endpoints according to their processing method.
Returns:
A dict where keys are the processing method names, and values
are the routes that are bound to the key method.
"""
endpoints = {}
for rule in app.url_map.iter_rules():
rule_dict = {'rule': rule.rule,
'methods': rule.methods}
if rule.endpoint not in endpoints:
endpoints[rule.endpoint] = [rule_dict]
else:
endpoints[rule.endpoint].append(rule_dict)
return endpoints
@classmethod
def index_add_route(cls, route, docstring, **kwargs):
"""
Add a route to the self-documenting API reference
"""
if route not in cls.apidoc_routes:
d = {'docstring': docstring}
for k, v in kwargs.items():
d[k] = v
cls.apidoc_routes[route] = d
class APIDocException(Exception):
"""
Custom exception to signal errors in the use of the APIDoc decorators
"""
class APIDocBase(object):
"""
The API documentation decorator base class, responsible for the
operations that link the decorator stack together:
* manages the _inner_dec property, which represents the
decorator directly below self in the decorator tower
* contains the logic used to return appropriately if self is the last
decorator to be applied to the API function
"""
def __init__(self):
self._inner_dec = None
@property
def inner_dec(self):
return self._inner_dec
@inner_dec.setter
def inner_dec(self, instance):
self._inner_dec = instance
@property
def data(self):
raise NotImplementedError
def process_rv(self, f, args, kwargs):
"""
From the arguments f has, determine whether or not it is the last
decorator in the stack, and return the appropriate call to f.
"""
rv = None
if 'outer_decorator' in f.__code__.co_varnames:
rv = f(*args, **kwargs)
else:
nargs = {k: v for k, v in kwargs.items() if k != 'outer_decorator'}
try:
rv = f(*args, **nargs)
except (TypeError, KeyError): # documentation call
rv = None
return rv
def maintain_stack(self, f, args, kwargs):
"""
From the arguments f is called with, determine whether or not the
stack link was made by @apidoc.route, and maintain the linking for
the next call to f.
"""
if 'outer_decorator' not in kwargs:
raise APIDocException('Apidoc %s: expected an apidoc'
' route decorator first'
% self.__class__.__name__)
kwargs['outer_decorator'].inner_dec = self
kwargs['outer_decorator'] = self
return self.process_rv(f, args, kwargs)
class route(APIDocBase): # noqa: N801
"""Decorate an API method to register it in the API doc route index
and create the corresponding Flask route.
This decorator is responsible for bootstrapping the linking of subsequent
decorators, as well as traversing the decorator stack to obtain the
documentation data from it.
Args:
route: documentation page's route
noargs: set to True if the route has no arguments, and its
result should be displayed anytime its documentation
is requested. Default to False
hidden: set to True to remove the endpoint from being listed
in the /api endpoints. Default to False.
tags: Further information on api endpoints. Two values are
possibly expected:
- hidden: remove the entry points from the listing
- upcoming: display the entry point but it is not followable
"""
def __init__(self, route, noargs=False, tags=[]):
super().__init__()
self.route = route
self.noargs = noargs
self.tags = set(tags)
def __call__(self, f):
options = {}
if 'hidden' not in self.tags:
if self.tags:
options['tags'] = self.tags
APIUrls.index_add_route(self.route, f.__doc__, **options)
@wraps(f)
def doc_func(*args, **kwargs):
kwargs['outer_decorator'] = self
rv = self.process_rv(f, args, kwargs)
return self.compute_return(f, rv)
if not self.noargs:
app.add_url_rule(self.route, f.__name__, doc_func)
return doc_func
def filter_api_url(self, endpoint, route_re, noargs):
doc_methods = {'GET', 'HEAD', 'OPTIONS'}
if re.match(route_re, endpoint['rule']):
if endpoint['methods'] == doc_methods and not noargs:
return False
return True
def build_examples(self, f, urls, args):
"""Build example documentation.
Args:
f: function
urls: information relative to url for that function
args: information relative to arguments for that function
Yields:
example based on default parameter value if any
"""
s = set()
r = []
for data_url in urls:
url = data_url['rule']
defaults = {arg['name']: arg['default']
for arg in args
if arg['name'] in url}
if defaults:
url = url_for(f.__name__, **defaults)
if url in s:
continue
s.add(url)
r.append(url)
return r
def compute_return(self, f, rv):
"""Build documentation"""
data = self.data
if not f.__doc__:
raise APIDocException('Apidoc %s: expected a docstring'
' for function %s'
% (self.__class__.__name__, f.__name__))
data['docstring'] = f.__doc__
route_re = re.compile('.*%s$' % data['route'])
endpoint_list = APIUrls.get_method_endpoints(f.__name__)
data['urls'] = [url for url in endpoint_list if
self.filter_api_url(url, route_re, data['noargs'])]
if 'args' in data:
data['examples'] = self.build_examples(
f, data['urls'], data['args'])
# Prepare and send to mimetype selector if it's not a doc request
if re.match(route_re, request.url) and not data['noargs'] \
and request.method == 'GET':
return app.response_class(
render_template('apidoc.html', **data),
content_type='text/html')
g.doc_env = data # Store for response processing
return rv
@property
def data(self):
data = {'route': self.route, 'noargs': self.noargs}
doc_instance = self.inner_dec
while doc_instance:
if isinstance(doc_instance, arg):
if 'args' not in data:
data['args'] = []
data['args'].append(doc_instance.data)
elif isinstance(doc_instance, raises):
if 'excs' not in data:
data['excs'] = []
data['excs'].append(doc_instance.data)
elif isinstance(doc_instance, returns):
data['return'] = doc_instance.data
elif isinstance(doc_instance, header):
if 'headers' not in data:
data['headers'] = []
data['headers'].append(doc_instance.data)
+ elif isinstance(doc_instance, param):
+ if 'params' not in data:
+ data['params'] = []
+
+ data['params'].append(doc_instance.data)
else:
raise APIDocException('Unknown API documentation decorator')
doc_instance = doc_instance.inner_dec
return data
-class arg(APIDocBase):
+class BaseDescribeDocBase(APIDocBase):
+ """Base description of optional input/output setup for a route.
+
+ """
+ def __init__(self):
+ self.doc_dict = None
+ self.inner_dec = None
+
+ def __call__(self, f):
+ @wraps(f)
+ def arg_fun(*args, outer_decorator=None, **kwargs):
+ kwargs['outer_decorator'] = outer_decorator
+ return self.maintain_stack(f, args, kwargs)
+ return arg_fun
+
+ @property
+ def data(self):
+ return self.doc_dict
+
+
+class arg(BaseDescribeDocBase):
"""
Decorate an API method to display an argument's information on the doc
page specified by @route above.
Args:
name: the argument's name. MUST match the method argument's name to
create the example request URL.
default: the argument's default value
argtype: the argument's type as an Enum value from apidoc.argtypes
argdoc: the argument's documentation string
"""
def __init__(self, name, default, argtype, argdoc):
super().__init__()
self.doc_dict = {
'name': name,
'type': argtype.value,
'doc': argdoc,
'default': default
}
- self.inner_dec = None
-
- def __call__(self, f):
- @wraps(f)
- def arg_fun(*args, outer_decorator=None, **kwargs):
- kwargs['outer_decorator'] = outer_decorator
- return self.maintain_stack(f, args, kwargs)
- return arg_fun
- @property
- def data(self):
- return self.doc_dict
-
-class header(APIDocBase):
+class header(BaseDescribeDocBase):
"""
Decorate an API method to display header information the api can
potentially return in the response.
Args:
name: the header name
doc: the information about that header
"""
def __init__(self, name, doc):
+ super().__init__()
self.doc_dict = {
'name': name,
'doc': doc,
}
- def __call__(self, f):
- @wraps(f)
- def arg_fun(*args, outer_decorator=None, **kwargs):
- kwargs['outer_decorator'] = outer_decorator
- return self.maintain_stack(f, args, kwargs)
- return arg_fun
- @property
- def data(self):
- return self.doc_dict
+class param(BaseDescribeDocBase):
+ """Decorate an API method to display query parameter information the
+ api can potentially accept.
+
+ Args:
+ name: the parameter name
+ default: default value
+ doc: the information about that header
+
+ """
+ def __init__(self, name, default, doc):
+ super().__init__()
+ self.doc_dict = {
+ 'name': name,
+ 'default': default,
+ 'doc': doc,
+ }
class raises(APIDocBase): # noqa: N801
"""
Decorate an API method to display information pertaining to an exception
that can be raised by this method.
Args:
exc: the exception name as an Enum value from apidoc.excs
doc: the exception's documentation string
"""
def __init__(self, exc, doc):
super().__init__()
self.exc_dict = {
'exc': exc.value,
'doc': doc
}
def __call__(self, f):
@wraps(f)
def exc_fun(*args, outer_decorator=None, **kwargs):
kwargs['outer_decorator'] = outer_decorator
return self.maintain_stack(f, args, kwargs)
return exc_fun
@property
def data(self):
return self.exc_dict
class returns(APIDocBase): # noqa: N801
"""
Decorate an API method to display information about its return value.
Args:
rettype: the return value's type as an Enum value from apidoc.rettypes
retdoc: the return value's documentation string
"""
def __init__(self, rettype=None, retdoc=None):
super().__init__()
self.return_dict = {
'type': rettype.value,
'doc': retdoc
}
def __call__(self, f):
@wraps(f)
def ret_fun(*args, outer_decorator=None, **kwargs):
kwargs['outer_decorator'] = outer_decorator
return self.maintain_stack(f, args, kwargs)
return ret_fun
@property
def data(self):
return self.return_dict
diff --git a/swh/web/ui/templates/apidoc.html b/swh/web/ui/templates/apidoc.html
index 9e89a78dc..c2cb1c214 100644
--- a/swh/web/ui/templates/apidoc.html
+++ b/swh/web/ui/templates/apidoc.html
@@ -1,108 +1,119 @@
{% extends "layout.html" %}
{% block title %}Software Heritage API{% endblock %}
{% block content %}
{% if docstring %}
Overview
{% autoescape off %} {{ docstring | safe_docstring_display }} {% endautoescape %}
{% endif %}
{% if response_data and response_data is not none %}
Request
{{ request.method }} {{ request.url }}
Result
{% autoescape off %}{{ response_data | urlize_api_links }}{% endautoescape %}
{% if headers_data and headers_data is not none %}
Headers
{% endif %}
{% endif %}
URL |
Allowed Methods |
{% for url in urls %}
{{ url['rule'] }}
|
{{ url['methods'] | sort | join(', ') }}
|
{% endfor %}
{% if args and args|length > 0 %}
Args
{% for arg in args %}
- {{ arg['name'] }}: {{ arg['type'] }}
- {% autoescape off %} {{ arg['doc'] | safe_docstring_display }} {% endautoescape %}
{% endfor %}
{% endif %}
+{% if params and params|length > 0 %}
+
+
Params
+
+ {% for param in params %}
+ - {{ param['name'] }}: string
+ - {% autoescape off %} {{ param['doc'] | safe_docstring_display }} {% endautoescape %}
+ {% endfor %}
+
+
+{% endif %}
{% if excs and excs|length > 0 %}
Raises
{% for exc in excs %}
- {{ exc['exc'] }}
- {% autoescape off %} {{ exc['doc'] | safe_docstring_display }} {% endautoescape %}
{% endfor %}
{% endif %}
{% if headers %}
{% endif %}
{% if return %}
Returns
- {{ return['type'] }}
- {% autoescape off %} {{ return['doc'] | safe_docstring_display }} {% endautoescape %}
{% endif %}
{% if examples %}
Examples
{% for example in examples %}
-
{{ example }}
{% endfor %}
{% endif %}
{% endblock %}
diff --git a/swh/web/ui/tests/test_apidoc.py b/swh/web/ui/tests/test_apidoc.py
index 77930edc6..6f0237f87 100644
--- a/swh/web/ui/tests/test_apidoc.py
+++ b/swh/web/ui/tests/test_apidoc.py
@@ -1,125 +1,127 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from nose.tools import istest, nottest
from swh.web.ui import apidoc
from swh.web.ui.main import app
from swh.web.ui.tests.test_app import SWHApidocTestCase
class APIDocTestCase(SWHApidocTestCase):
def setUp(self):
self.arg_dict = {
'name': 'my_pretty_arg',
'default': 'some default value',
'type': apidoc.argtypes.sha1,
'doc': 'this arg does things'
}
self.stub_excs = [{'exc': apidoc.excs.badinput,
'doc': 'My exception documentation'}]
self.stub_args = [{'name': 'stub_arg',
'default': 'some_default'}]
self.stub_rule_list = [
{'rule': 'some/route/with/args/',
'methods': {'GET', 'HEAD', 'OPTIONS'}},
{'rule': 'some/doc/route/',
'methods': {'GET', 'HEAD', 'OPTIONS'}},
{'rule': 'some/other/route/',
'methods': {'GET', 'HEAD', 'OPTIONS'}}
]
self.stub_return = {
'type': apidoc.rettypes.dict.value,
'doc': 'a dict with amazing properties'
}
@apidoc.route('/my/nodoc/url/')
@nottest
def apidoc_nodoc_tester(arga, argb):
return arga + argb
@istest
def apidoc_nodoc_failure(self):
with self.assertRaises(Exception):
self.client.get('/my/nodoc/url/')
@istest
def apidoc_badorder_failure(self):
with self.assertRaises(AssertionError):
@app.route('/my/badorder/url//')
@apidoc.arg('foo',
default=True,
argtype=apidoc.argtypes.int,
argdoc='It\'s so fluffy!')
@apidoc.route('/my/badorder/url/')
@nottest
def apidoc_badorder_tester(foo, bar=0):
"""
Some irrelevant doc since the decorators are bad
"""
return foo + bar
@app.route('/some///')
@apidoc.route('/some/doc/route/')
@nottest
def apidoc_route_tester(myarg, myotherarg, akw=0):
"""
Sample doc
"""
return {'result': myarg + myotherarg + akw}
@istest
def apidoc_route_doc(self):
# when
rv = self.client.get('/some/doc/route/')
# then
self.assertEqual(rv.status_code, 200)
self.assert_template_used('apidoc.html')
@istest
def apidoc_route_fn(self):
# when
rv = self.client.get('/some/1/1/')
# then
self.assertEqual(rv.status_code, 200)
@app.route('/some/full///')
@apidoc.route('/some/complete/doc/route/')
@apidoc.arg('myarg',
default=67,
argtype=apidoc.argtypes.int,
argdoc='my arg')
+ @apidoc.param('limit', default=10, doc='Result limitation')
+ @apidoc.header('Link', doc='Header link returns for pagination purpose')
@apidoc.raises(exc=apidoc.excs.badinput, doc='Oops')
@apidoc.returns(rettype=apidoc.rettypes.dict,
retdoc='sum of args')
@nottest
def apidoc_full_stack_tester(myarg, myotherarg, akw=0):
"""
Sample doc
"""
return {'result': myarg + myotherarg + akw}
@istest
def apidoc_full_stack_doc(self):
# when
rv = self.client.get('/some/complete/doc/route/')
# then
self.assertEqual(rv.status_code, 200)
self.assert_template_used('apidoc.html')
@istest
def apidoc_full_stack_fn(self):
# when
rv = self.client.get('/some/full/1/1/')
# then
self.assertEqual(rv.status_code, 200)
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index 6ececc102..b1dc2803a 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,1025 +1,1031 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="A dictionary of SWH's most important statistics")
def api_stats():
"""Return statistics on SWH storage.
"""
return service.stat_counters()
@app.route('/api/1/origin//visits/')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""All instances of visits of the origin pointed by
origin_id as POSIX time since epoch (if visit_id is not defined)
""")
def api_origin_visits(origin_id):
"""Return a list of origin visit (dict) for that particular origin
including date (visit date as posix timestamp), target,
target_type, status, ...
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_visit_url'] = url_for('api_origin_visit',
origin_id=ov['origin'],
visit_id=ov['visit'])
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visits,
error_msg_if_not_found='No origin %s found' % origin_id,
enrich_fn=_enrich_origin_visit)
@app.route('/api/1/origin//visit//')
@doc.route('/api/1/origin/visit/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
@doc.arg('visit_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin visit identifier')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if no visit that match the query is found')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""The single instance visit visit_id of the origin pointed
by origin_id as POSIX time since epoch""")
def api_origin_visit(origin_id, visit_id):
"""Return origin visit (dict) for that particular origin including
(but not limited to) date (visit date as posix timestamp),
target, target_type, status, ...
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_url'] = url_for('api_origin', origin_id=ov['origin'])
if 'occurrences' in ov:
ov['occurrences'] = {
k: utils.enrich_object(v)
for k, v in ov['occurrences'].items()
}
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visit,
'No visit %s for origin %s found' % (visit_id, origin_id),
_enrich_origin_visit,
visit_id)
@app.route('/api/1/content/symbol/', methods=['POST'])
@app.route('/api/1/content/symbol//')
@doc.route('/api/1/content/symbol/', tags=['upcoming'])
@doc.arg('q',
default='hello',
argtype=doc.argtypes.str,
argdoc="""An expression string to lookup in swh's raw content""")
-@doc.header('Link', doc="""Optional 'Link' header proposed to the api consumer
- for navigation purpose. possible are 'next'
- or 'previous' page.""")
+@doc.header('Link',
+ doc="""Optional 'Link' header proposed to the api consumer
+ for navigation purpose. possible are 'next'
+ or 'previous' page.""")
+@doc.param('per_page', default=10,
+ doc="""Optional parameter which permits
+ to limit the number of data to retrieve on a per request
+ basis.
+ The default is 10, up to 50 max.""")
@doc.returns(rettype=doc.rettypes.list,
retdoc="""A list of dict whose content matches the expression.
Each dict has the following keys:
- id (bytes): identifier of the content
- name (text): symbol whose content match the expression
- kind (text): kind of the symbol that matched
- lang (text): Language for that entry
- line (int): Number line for the symbol
The result is paginated by page of 10 results. The
'Link' header gives the relation to follow for the next
and eventually the previous page.
""")
def api_content_symbol(q=None):
"""Search symbol in indexed content's data.
The result is paginated and and Link header will give the next and
previous link to have the next result.
"""
result = {}
last_sha1 = request.args.get('last_sha1', None)
per_page = int(request.args.get('per_page', '10'))
def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page):
return service.lookup_expression(exp, last_sha1, per_page)
symbols = _api_lookup(
q,
lookup_fn=lookup_exp,
error_msg_if_not_found='No indexed raw content match expression \''
'%s\'.' % q,
enrich_fn=lambda x: utils.enrich_content(x, top_url=True))
if symbols:
l = len(symbols)
url = url_for('api_content_symbol', q=q)
headers = {}
if l == per_page:
new_last_sha1 = symbols[-1]['sha1']
params = [('last_sha1', new_last_sha1)]
nb_per_page = request.args.get('per_page')
if nb_per_page:
params.append(('per_page', nb_per_page))
headers['link-next'] = utils.to_url(url, params)
result['headers'] = headers
result.update({
'results': symbols
})
return result
@app.route('/api/1/content/known/', methods=['POST'])
@app.route('/api/1/content/known//')
@doc.route('/api/1/content/known/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""A dict with keys:
- search_res: a list of dicts corresponding to queried content
with key 'found' to True if found, 'False' if not
- search_stats: a dict containing number of files searched and
percentage of files found
""")
def api_check_content_known(q=None):
"""Search a content per hash.
This may take the form of:
- a GET request with many hashes (limited to the size of parameter
we can pass in url) a POST request with many hashes, with the
- request body containing identifiers (typically filenames) as
- keys and corresponding hashes as values.
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
queries = []
# GET: Many hash separated values request
if q:
hashes = q.split(',')
for v in hashes:
queries.append({'filename': None, 'sha1': v})
# POST: Many hash requests in post form submission
elif request.method == 'POST':
data = request.form
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if queries:
lookup = service.lookup_multiple_hashes(queries)
result = []
l = len(queries)
for el in lookup:
result.append({'filename': el['filename'],
'sha1': el['sha1'],
'found': el['found']})
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = l
search_stats['pct'] = (nbfound / l) * 100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin//')
@app.route('/api/1/origin//url//')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The origin's SWH origin_id.")
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc="The origin's type (git, svn..)")
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc="The origin's URL.")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if origin_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the origin identified by origin_id')
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Return information about the origin matching the passed criteria.
Criteria may be:
- An SWH-specific ID, if you already know it
- An origin type and its URL, if you do not have the origin's SWH
identifier
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
def _enrich_origin(origin):
if 'id' in origin:
o = origin.copy()
o['origin_visits_url'] = url_for('api_origin_visits',
origin_id=o['id'])
return o
return origin
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_origin)
@app.route('/api/1/person//')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The person's SWH identifier")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if person_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Return information about person with identifier person_id.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release//')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='97d8dcd0c589b1d94a5d26cf0c1e8f2f44b92bfd',
argtype=doc.argtypes.sha1_git,
argdoc="The release's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if the argument is not a sha1')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if sha1_git does not correspond to a release in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory//')
@doc.route('/api/1/revision/origin/directory/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='Dockerfile',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the passed criteria was
not found""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
passed criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin//')
@app.route('/api/1/revision'
'/origin/'
'/branch//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//')
@app.route('/api/1/revision'
'/origin/'
'/ts//')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The queried revision's origin identifier in SWH")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master)""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="The time at which the queried revision should be constrained")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching given criteria was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision identified by the given
criteria""")
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Display revision information through its identification by
origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision//')
@app.route('/api/1/revision//prev//')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision(sha1_git, context=None):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision//raw/')
@doc.route('/api/1/revision/raw/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision//directory/')
@app.route('/api/1/revision//directory//')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier.")
@doc.arg('dir_path',
default='Documentation/BUG-HUNTING',
argtype=doc.argtypes.path,
argdoc='The path from the top level directory')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching sha1_git was not found in SWH
, or if the path specified does not exist""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the directory pointed by revision id
sha1-git and dir_path""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision//log/')
@app.route('/api/1/revision//prev//log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's identifier queried")
@doc.arg('prev_sha1s',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc="""(Optional) Navigation breadcrumbs (descendant revisions
previously visited). If multiple values, use / as delimiter. """)
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git or prev_sha1s is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The log data starting at the revision identified by
sha1_git, completed with the navigation breadcrumbs,
if any""")
def api_revision_log(sha1_git, prev_sha1s=None):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git, or the first
breadcrumb, if any.
The result is paginated. To browse for the following revisions,
use the link mentioned in the 'next_revs_url' key.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
revisions = None
next_revs_url = None
def lookup_revision_log_with_limit(s, limit=limit+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
if len(rev_get) == limit+1:
rev_backward = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/revision'
'/origin//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//log/')
@app.route('/api/1/revision'
'/origin/'
'/ts//log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(Optional) The revision's branch name within the origin specified.
Defaults to 'refs/heads/master'.""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""(Optional) A time or timestamp string to parse""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the given criteria was not
found in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision log starting at the revision
matching the given criteria.""")
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show all revisions (~git log) starting from the revision targeted
by the origin_id provided and optionally a branch name or/and a
timestamp.
The result is paginated. To browse the following revisions, use
the link mentioned in the 'next_revs_url' key.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
next_revs_url = None
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
if len(rev_get) == limit+1:
revisions = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
revisions = rev_get
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/directory//')
@app.route('/api/1/directory///')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8',
argtype=doc.argtypes.sha1_git,
argdoc="The queried directory's corresponding sha1_git hash")
@doc.arg('path',
default='codec/demux',
argtype=doc.argtypes.path,
argdoc="A path relative to the queried directory's top level")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a directory matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata and contents of the directory identified by
sha1_git""")
def api_directory(sha1_git,
path=None):
"""Return information about directory with id sha1_git.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance//')
@doc.route('/api/1/provenance/', tags=['hidden'])
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
p['origin_visits_url'] = url_for('api_origin_visits',
origin_id=provenance['origin'])
p['origin_visit_url'] = url_for('api_origin_visit',
origin_id=provenance['origin'],
visit_id=provenance['visit'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/filetype//')
@doc.route('/api/1/filetype/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Filetype information (dict) for the matched
content.""")
def api_content_filetype(q):
"""Return content's filetype information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_filetype,
error_msg_if_not_found='No filetype information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/language//')
@doc.route('/api/1/language/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Language information (dict) for the matched
content.""")
def api_content_language(q):
"""Return content's language information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_language,
error_msg_if_not_found='No language information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/license//')
@doc.route('/api/1/license/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""License information (dict) for the matched
content.""")
def api_content_license(q):
"""Return content's license information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_license,
error_msg_if_not_found='No license information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/ctags//')
@doc.route('/api/1/ctags/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Ctags symbol (dict) for the matched
content.""")
def api_content_ctags(q):
"""Return content's ctags symbols if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_ctags,
error_msg_if_not_found='No ctags symbol found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/content//raw/')
@doc.route('/api/1/content/raw/', tags=['hidden'])
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Return content's raw data if content is found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content//')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the content identified by q. If content
decoding was successful, it also returns the data""")
def api_content_metadata(q):
"""Return content information if content is found.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity//')
@doc.route('/api/1/entity/', tags=['hidden'])
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if uuid is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if an entity matching uuid was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)